-
Notifications
You must be signed in to change notification settings - Fork 69
feat: support attribute insertion in OTAP transform #1737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1737 +/- ##
==========================================
+ Coverage 84.64% 84.72% +0.08%
==========================================
Files 502 502
Lines 148560 149864 +1304
==========================================
+ Hits 125745 126975 +1230
- Misses 22281 22355 +74
Partials 534 534
🚀 New features to boost your workflow:
|
|
|
||
| /// Helper to extract string key from a record batch key column at a given index. | ||
| /// Handles both plain Utf8 and Dictionary-encoded columns. | ||
| fn get_key_at_index(key_col: &ArrayRef, idx: usize) -> Option<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a helper type that can encapsulate this logic: otap_df_pdata::arrays::StringArrayAccessor
| pub(crate) type StringArrayAccessor<'a> = MaybeDictArrayAccessor<'a, StringArray>; |
Instead of having a method defined here specially for this, below in the create_inserted_batch you could do
let key_col = current_batch
.column_by_name(consts::ATTRIBUTE_KEY)
.map(StringArrayAccessor::try_new)
.transpose()?;Then you could probably return an error if the key_col ends up being None.
This type provides the str_at method which returns an Option<&str>
| })?; | ||
| let key_type = schema.field(key_col_idx).data_type(); | ||
|
|
||
| let new_keys: ArrayRef = match key_type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using the regular arrow builders and having to write these match statements to append values to arrays that could be dict/native type, we have a set of helper types that can encapsulate this logic.
For example, StringArrayBuilder:
otel-arrow/rust/otap-dataflow/crates/pdata/src/encode/record/array.rs
Lines 650 to 656 in 2c3976c
| pub type StringArrayBuilder = AdaptiveArrayBuilder< | |
| String, | |
| NoArgs, | |
| StringBuilder, | |
| StringDictionaryBuilder<UInt8Type>, | |
| StringDictionaryBuilder<UInt16Type>, | |
| >; |
This type also exposes a append_str_n method which can be used to append the same string multiple times. This is usually faster than appending the values one at a time. So an optimization we could make here, if we're appending the same key multiple times, would be to use this method.
There are similar builders for the other types we're inserting below as values (int, double, bool)
| })?; | ||
|
|
||
| // Build a set of (parent_id, key) pairs that already exist | ||
| let mut existing_keys: BTreeMap<u16, BTreeSet<String>> = BTreeMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if there are any ways we could optimize building up the set of attributes we're going to insert.
For example, using a BTreeMap here, and a BTreeSet below for unique_parents means that we'll be hashing every parent_id multiple times. It might be faster to use a RoaringBitmap for the unique_parent_ids, and maybe we could have a RoaringBitmap for each insert entry corresponding to whether the row w/ some parent_id contains the attribute.
| let parent_ids_arr = parent_ids | ||
| .as_any() | ||
| .downcast_ref::<PrimitiveArray<UInt16Type>>() | ||
| .ok_or_else(|| Error::ColumnDataTypeMismatch { | ||
| name: consts::PARENT_ID.into(), | ||
| expect: DataType::UInt16, | ||
| actual: parent_ids.data_type().clone(), | ||
| })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parent_ids won't always be u16. For example, for metrics datapoint attributes and attributes for span link and span event, the parent ID type can be u32. Also, for u32 IDs, the parent_id_arr can be dictionary encoded (e.g. it's not always a PrimitiveArray).
To handle this, we might need to make the function generic over T where:
T: ParentId,
<T as ParentId>::ArrayType: ArrowPrimitiveType,(You'll see we do something similar in this file for materialize_parent_id_for_attributes).
Then we can get the parent_ids as:
| let parent_ids_arr = parent_ids | |
| .as_any() | |
| .downcast_ref::<PrimitiveArray<UInt16Type>>() | |
| .ok_or_else(|| Error::ColumnDataTypeMismatch { | |
| name: consts::PARENT_ID.into(), | |
| expect: DataType::UInt16, | |
| actual: parent_ids.data_type().clone(), | |
| })?; | |
| let parent_ids_arr = MaybeDictArrayAccessor::<PrimitiveArray<T::ArrayType>>::try_new( | |
| get_required_array(record_batch, consts::PARENT_ID)?, | |
| )?; |
To ensure we hande u32 parent IDs correctly, it probably also makes sense to add a test for this somewhere.
| // We collect columns into a map or vec matching schema order. | ||
| let mut columns = Vec::with_capacity(schema.fields().len()); | ||
|
|
||
| for field in schema.fields() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a case that the logic here might not handle correctly, which is if we're inserting a type of attribute and the original schema did not previously contain any attributes of this type.
All the ATTRIBUTE_* columns are optional in OTAP. For example, if some attribute RecordBatch had no values of type int, the ATTRIBUTE_INT column would be omitted. If we encountered such a record batch and we were inserting an integer attribute, it would not be included in the original batch with this logic.
We should add a test for this and handle it. We might need to write some custom batch concatenation logic for this, rather than relying on arrows concat_batches` compute kernel, or make some modifications to the original batch before we invoke this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this has been fixed. The necessary metadata is added to schema in extend_schema_for_inserts before the insertion.
| let combined = arrow::compute::concat_batches(&rb.schema(), &[rb, new_rows]) | ||
| .map_err(|e| Error::Format { | ||
| error: e.to_string(), | ||
| })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned in https://github.com/open-telemetry/otel-arrow/pull/1737/files#r2704902249 that we might need to be careful about how we invoke this concat_batches function due to the case where we're inserting a column that was not contained in the original batch.
There's another case we probably need to handle as well which is, if the column is dictionary encoded and inserting the new value would cause the dictionary to overflow, then we need to either expand the key type (e.g. convert from a Dict to a Dict) or convert from Dict to non dict encoded array.
albertlockett
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ThomsonTan thanks for taking this! I'm really excited to see this desperately needed feature getting implemented and this looks like a great start!
I left a few comments, mostly around where we could use different types to simplify the code, some edge cases in the OTAP protocol and suggestions for optimizations.
WRT to optimization, I have a few general comments:
-
We could probably do them in a followup PR after adding new benchmarks to the existing suite in
benchmarks/benches/attribute_transform/main.rs. Having benchmarks will give us confidence that the optimizations we're adding are actually effective. -
Another optimization we could consider in the future would be: currently we're materializing the
RecordBatchfor the rename & deletes, then materializing another for the inserts, and concatenating them together. This means that for each column, we create twoArc<dyn Array>, and after discard them while concatenating them into a newArc<dyn Array>for the final result. We might be able to avoid this by:
a) inserting the new keys while we're doing the rename/delete
b) inserting the new values while taking the values/parent_id columns
I realize that this makes the implementation significantly more complex, so it's fine if we want to just document this as a future optimization. The only reason I'm calling it out ahead of time is that some of the code we write to handle OTAP edge cases (see https://github.com/open-telemetry/otel-arrow/pull/1737/files#r2704924006) would be different with this optimization in place.
| let should_materialize_parent_ids = | ||
| any_rows_deleted && schema.column_with_name(consts::PARENT_ID).is_some(); | ||
| let should_materialize_parent_ids = (any_rows_deleted || insert_needed) | ||
| && schema.column_with_name(consts::PARENT_ID).is_some(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Co-authored-by: albertlockett <[email protected]>
Co-authored-by: albertlockett <[email protected]>
Co-authored-by: albertlockett <[email protected]>
Co-authored-by: albertlockett <[email protected]>
Co-authored-by: albertlockett <[email protected]>
Fix #1035